#!/usr/bin/env python
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>

import fcntl
import math
import os
import re
import string
import sys
from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
from contextlib import contextmanager, suppress
from functools import lru_cache
from re import Match, Pattern
from typing import (
    TYPE_CHECKING,
    Any,
    BinaryIO,
    NamedTuple,
    Optional,
    cast,
)

from .constants import (
    clear_handled_signals,
    config_dir,
    is_macos,
    is_wayland,
    kitten_exe,
    runtime_dir,
    shell_path,
    ssh_control_master_template,
)
from .fast_data_types import WINDOW_FULLSCREEN, WINDOW_HIDDEN, WINDOW_MAXIMIZED, WINDOW_MINIMIZED, WINDOW_NORMAL, Color, Shlex, get_options, monotonic, open_tty
from .fast_data_types import timed_debug_print as _timed_debug_print
from .types import run_once
from .typing_compat import AddressFamily, PopenType, StartupCtx

if TYPE_CHECKING:
    import tarfile

    from .fast_data_types import OSWindowSize
    from .options.types import Options
else:
    Options = object


class Flag:

    def __init__(self, initial_val: bool = True) -> None:
        self.val = initial_val

    def __enter__(self) -> None:
        self.val ^= True

    def __exit__(self, *a: object) -> None:
        self.val ^= True

    def __bool__(self) -> bool:
        return self.val


disallow_expand_vars = Flag(False)


def expandvars(val: str, env: Mapping[str, str] = {}, fallback_to_os_env: bool = True) -> str:
    '''
    Expand $VAR and ${VAR} Use $$ for a literal $
    '''

    def sub(m: 'Match[str]') -> str:
        key = m.group(1) or m.group(2)
        result = env.get(key)
        if result is None and fallback_to_os_env:
            result = os.environ.get(key)
        if result is None:
            result = m.group()
        return result

    if disallow_expand_vars or '$' not in val:
        return val

    return re.sub(r'\$(?:(\w+)|\{([^}]+)\})', sub, val.replace('$$', '\0')).replace('\0', '$')


@lru_cache(maxsize=2)
def sgr_sanitizer_pat(for_splitting: bool = False) -> 're.Pattern[str]':
    pat = '\033\\[.*?m'
    if for_splitting:
        return re.compile(f'({pat})')
    return re.compile(pat)


@run_once
def kitty_ansi_sanitizer_pat() -> 're.Pattern[str]':
    # removes ANSI sequences generated by kitty's ANSI output routines. Not
    # suitable for stripping general ANSI sequences
    return re.compile(r'\x1b(?:\[[0-9;:]*?m|\].*?\x1b\\)')


def platform_window_id(os_window_id: int) -> int | None:
    if is_macos:
        from .fast_data_types import cocoa_window_id
        with suppress(Exception):
            return cocoa_window_id(os_window_id)
    if not is_wayland():
        from .fast_data_types import x11_window_id
        with suppress(Exception):
            return x11_window_id(os_window_id)
    return None


def safe_print(*a: Any, **k: Any) -> None:
    with suppress(Exception):
        print(*a, **k)


def log_error(*a: Any, **k: str) -> None:
    from .fast_data_types import log_error_string
    output = getattr(log_error, 'redirect', log_error_string)
    with suppress(Exception):
        msg = k.get('sep', ' ').join(map(str, a)) + k.get('end', '')
        output(msg)


@contextmanager
def suppress_error_logging() -> Iterator[None]:
    before = getattr(log_error, 'redirect', suppress_error_logging)
    setattr(log_error, 'redirect', lambda *a: None)
    try:
        yield
    finally:
        if before is suppress_error_logging:
            delattr(log_error, 'redirect')
        else:
            setattr(log_error, 'redirect', before)


def ceil_int(x: float) -> int:
    return int(math.ceil(x))


def sanitize_title(x: str) -> str:
    return re.sub(r'\s+', ' ', re.sub(r'[\0-\x19\x80-\x9f]', '', x))


def color_as_int(val: Color) -> int:
    return int(val) & 0xffffff


def color_from_int(val: int) -> Color:
    return Color((val >> 16) & 0xFF, (val >> 8) & 0xFF, val & 0xFF)


class ScreenSize(NamedTuple):
    rows: int
    cols: int
    width: int
    height: int
    cell_width: int
    cell_height: int


def read_screen_size(fd: int = -1) -> ScreenSize:
    import array
    import fcntl
    import termios
    buf = array.array('H', [0, 0, 0, 0])
    if fd < 0:
        fd = sys.stdout.fileno()
    fcntl.ioctl(fd, termios.TIOCGWINSZ, cast(bytearray, buf))
    rows, cols, width, height = tuple(buf)
    cell_width, cell_height = width // (cols or 1), height // (rows or 1)
    return ScreenSize(rows, cols, width, height, cell_width, cell_height)


class ScreenSizeGetter:
    changed = True
    Size = ScreenSize
    ans: ScreenSize | None = None

    def __init__(self, fd: int | None):
        if fd is None:
            fd = sys.stdout.fileno()
        self.fd = fd

    def __call__(self) -> ScreenSize:
        if self.changed:
            self.ans = read_screen_size(self.fd)
            self.changed = False
        return cast(ScreenSize, self.ans)


@lru_cache(maxsize=64, typed=True)
def screen_size_function(fd: int | None = None) -> ScreenSizeGetter:
    return ScreenSizeGetter(fd)


def fit_image(width: int, height: int, pwidth: int, pheight: int) -> tuple[int, int]:
    from math import floor
    if height > pheight:
        corrf = pheight / float(height)
        width, height = floor(corrf * width), pheight
    if width > pwidth:
        corrf = pwidth / float(width)
        width, height = pwidth, floor(corrf * height)
    if height > pheight:
        corrf = pheight / float(height)
        width, height = floor(corrf * width), pheight

    return int(width), int(height)


def base64_encode(
    integer: int,
    chars: str = string.ascii_uppercase + string.ascii_lowercase + string.digits +
    '+/'
) -> str:
    ans = ''
    while True:
        integer, remainder = divmod(integer, 64)
        ans = chars[remainder] + ans
        if integer == 0:
            break
    return ans


def command_for_open(program: str | list[str] = 'default') -> list[str]:
    if isinstance(program, str):
        from .conf.utils import to_cmdline
        program = to_cmdline(program)
    if program == ['default']:
        cmd = ['open'] if is_macos else ['xdg-open']
    else:
        cmd = program
    return cmd


def open_cmd(cmd: Iterable[str] | list[str], arg: None | Iterable[str] | str = None,
             cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]':
    import subprocess
    if arg is not None:
        cmd = list(cmd)
        if isinstance(arg, str):
            cmd.append(arg)
        else:
            cmd.extend(arg)
    env: dict[str, str] | None = None
    if extra_env:
        env = os.environ.copy()
        env.update(extra_env)
    return subprocess.Popen(
        tuple(cmd), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, cwd=cwd or None,
        preexec_fn=clear_handled_signals, env=env)


def open_url(url: str, program: str | list[str] = 'default', cwd: str | None = None, extra_env: dict[str, str] | None = None) -> 'PopenType[bytes]':
    return open_cmd(command_for_open(program), url, cwd=cwd, extra_env=extra_env)


def init_startup_notification_x11(window_handle: int, startup_id: str | None = None) -> Optional['StartupCtx']:
    # https://specifications.freedesktop.org/startup-notification-spec/startup-notification-latest.txt
    from kitty.fast_data_types import init_x11_startup_notification
    sid = startup_id or os.environ.pop('DESKTOP_STARTUP_ID', None)  # ensure child processes don't get this env var
    if not sid:
        return None
    from .fast_data_types import x11_display
    display = x11_display()
    if not display:
        return None
    return init_x11_startup_notification(display, window_handle, sid)


def end_startup_notification_x11(ctx: 'StartupCtx') -> None:
    from kitty.fast_data_types import end_x11_startup_notification
    end_x11_startup_notification(ctx)


def init_startup_notification(window_handle: int | None, startup_id: str | None = None) -> Optional['StartupCtx']:
    if is_macos or is_wayland():
        return None
    if window_handle is None:
        log_error('Could not perform startup notification as window handle not present')
        return None
    try:
        try:
            return init_startup_notification_x11(window_handle, startup_id)
        except OSError as e:
            if not str(e).startswith("Failed to load libstartup-notification"):
                raise e
            log_error(
                f'{e}. This has two main effects:',
                'There will be no startup feedback and when using --single-instance, kitty windows may start on an incorrect desktop/workspace.')
    except Exception:
        import traceback
        traceback.print_exc()
    return None


def end_startup_notification(ctx: Optional['StartupCtx']) -> None:
    if not ctx:
        return
    if is_macos or is_wayland():
        return
    try:
        end_startup_notification_x11(ctx)
    except Exception:
        import traceback
        traceback.print_exc()


class startup_notification_handler:

    # WARNING: This only works on X11 on other platforms extra_callback will be called
    # after the window is shown, not before, as they do not do two stage window
    # creation.

    def __init__(self, do_notify: bool = True, startup_id: str | None = None, extra_callback: Callable[[int], None] | None = None):
        self.do_notify = do_notify
        self.startup_id = startup_id
        self.extra_callback = extra_callback
        self.ctx: Optional['StartupCtx'] = None

    def __enter__(self) -> Callable[[int], None]:

        def pre_show_callback(window_handle: int) -> None:
            if self.extra_callback is not None:
                self.extra_callback(window_handle)
            if self.do_notify:
                self.ctx = init_startup_notification(window_handle, self.startup_id)

        return pre_show_callback

    def __exit__(self, *a: Any) -> None:
        if self.ctx is not None:
            end_startup_notification(self.ctx)


def unix_socket_directories() -> Iterator[str]:
    import tempfile
    home = os.path.expanduser('~')
    candidates = [tempfile.gettempdir(), home]
    if is_macos:
        from .fast_data_types import user_cache_dir
        candidates = [user_cache_dir(), '/Library/Caches']
    else:
        if os.environ.get('XDG_RUNTIME_DIR'):
            candidates.insert(0, os.environ['XDG_RUNTIME_DIR'])
    for loc in candidates:
        if os.access(loc, os.W_OK | os.R_OK | os.X_OK):
            yield loc


def unix_socket_paths(name: str, ext: str = '.lock') -> Generator[str, None, None]:
    home = os.path.expanduser('~')
    for loc in unix_socket_directories():
        filename = ('.' if loc == home else '') + name + ext
        yield os.path.join(loc, filename)


def parse_address_spec(spec: str) -> tuple[AddressFamily, tuple[str, int] | str, str | None]:
    import socket
    try:
        protocol, rest = spec.split(':', 1)
    except ValueError:
        raise ValueError(f'Invalid listen-on value: {spec} must be of the form protocol:address')
    socket_path = None
    address: str | tuple[str, int] = ''
    if protocol == 'unix':
        family = socket.AF_UNIX
        address = rest
        if address.startswith('@') and len(address) > 1:
            address = '\0' + address[1:]
        else:
            socket_path = address
    elif protocol in ('tcp', 'tcp6'):
        family = socket.AF_INET if protocol == 'tcp' else socket.AF_INET6
        if rest.startswith('['):  # ]
            host = rest[1:]
            host, sep, leftover = host.rpartition(']')
            _, port = leftover.rsplit(':', 1)
            if ':' in host and protocol == 'tcp':
                family = socket.AF_INET6
        else:
            host, port = rest.rsplit(':', 1)
        address = host, int(port)
    else:
        raise ValueError(f'Unknown protocol in listen-on value: {spec}')
    return family, address, socket_path


def parse_os_window_state(state: str) -> int:
    match state:
        case 'normal':
            return WINDOW_NORMAL
        case 'maximized':
            return WINDOW_MAXIMIZED
        case 'minimized':
            return WINDOW_MINIMIZED
        case 'fullscreen' | 'fullscreened':
            return WINDOW_FULLSCREEN
        case 'hidden':
            return WINDOW_HIDDEN
        case _:
            return WINDOW_NORMAL


def write_all(fd: int, data: str | bytes, block_until_written: bool = True) -> None:
    if isinstance(data, str):
        data = data.encode('utf-8')
    mvd = memoryview(data)
    while len(mvd) > 0:
        try:
            n = os.write(fd, mvd)
        except BlockingIOError:
            if not block_until_written:
                raise
            continue
        if not n:
            break
        mvd = mvd[n:]


class TTYIO:

    def __init__(self, read_with_timeout: bool = True):
        self.read_with_timeout = read_with_timeout

    def __enter__(self) -> 'TTYIO':
        self.tty_fd, self.original_termios = open_tty(self.read_with_timeout)
        return self

    def __exit__(self, *a: Any) -> None:
        from .fast_data_types import close_tty
        close_tty(self.tty_fd, self.original_termios)

    def wait_till_read_available(self) -> bool:
        if self.read_with_timeout:
            raise ValueError('Cannot wait when TTY is set to read with timeout')
        import select
        rd = select.select([self.tty_fd], [], [])[0]
        return bool(rd)

    def read(self, limit: int) -> bytes:
        return os.read(self.tty_fd, limit)

    def send(self, data: str | bytes | Iterable[str | bytes]) -> None:
        if isinstance(data, (str, bytes)):
            write_all(self.tty_fd, data)
        else:
            for chunk in data:
                write_all(self.tty_fd, chunk)

    def recv(self, more_needed: Callable[[bytes], bool], timeout: float, sz: int = 1) -> None:
        fd = self.tty_fd
        start_time = monotonic()
        while timeout > monotonic() - start_time:
            # will block for 0.1 secs waiting for data because we have set
            # VMIN=0 VTIME=1 in termios
            data = os.read(fd, sz)
            if data and not more_needed(data):
                break


def set_echo(fd: int = -1, on: bool = False) -> tuple[int, list[int | list[bytes | int]]]:
    import termios
    if fd < 0:
        fd = sys.stdin.fileno()
    old = termios.tcgetattr(fd)
    new = termios.tcgetattr(fd)
    if on:
        new[3] |= termios.ECHO
    else:
        new[3] &= ~termios.ECHO
    termios.tcsetattr(fd, termios.TCSADRAIN, new)
    return fd, old


@contextmanager
def no_echo(fd: int = -1) -> Iterator[None]:
    import termios
    fd, old = set_echo(fd)
    try:
        yield
    finally:
        termios.tcsetattr(fd, termios.TCSADRAIN, old)


def natsort_ints(iterable: Iterable[str]) -> list[str]:

    def convert(text: str) -> int | str:
        return int(text) if text.isdigit() else text

    def alphanum_key(key: str) -> tuple[int | str, ...]:
        return tuple(map(convert, re.split(r'(\d+)', key)))

    return sorted(iterable, key=alphanum_key)


def get_hostname(fallback: str = '') -> str:
    import socket
    try:
        return socket.gethostname() or fallback
    except Exception:
        return fallback


def resolve_editor_cmd(editor: str, shell_env: Mapping[str, str]) -> str | None:
    import shlex
    editor_cmd = list(shlex_split(editor))
    editor_exe = (editor_cmd or ('',))[0]
    if editor_exe and os.path.isabs(editor_exe):
        return editor
    if not editor_exe:
        return None

    def patched(exe: str) -> str:
        editor_cmd[0] = exe
        return ' '.join(map(shlex.quote, editor_cmd))

    if shell_env is os.environ:
        q = which(editor_exe, only_system=True)
        if q:
            return patched(q)
    elif 'PATH' in shell_env:
        import shutil
        q = shutil.which(editor_exe, path=shell_env['PATH'])
        if q:
            return patched(q)
    return None


def get_editor_from_env(env: Mapping[str, str]) -> str | None:
    for var in ('VISUAL', 'EDITOR'):
        editor = env.get(var)
        if editor:
            editor = resolve_editor_cmd(editor, env)
            if editor:
                return editor
    return None


def get_editor_from_env_vars(opts: Options | None = None) -> list[str]:
    editor = get_editor_from_env(os.environ)
    if not editor:
        shell_env = read_shell_environment(opts)
        editor = get_editor_from_env(shell_env)

    for ans in (editor, 'vim', 'nvim', 'vi', 'emacs', 'hx', 'kak', 'micro', 'nano', 'vis'):
        if ans and which(next(shlex_split(ans)), only_system=True):
            break
    else:
        ans = 'vim'
    return list(shlex_split(ans))


def get_editor(opts: Options | None = None, path_to_edit: str = '', line_number: int = 0) -> list[str]:
    if opts is None:
        try:
            opts = get_options()
        except RuntimeError:
            # we are in a kitten
            from .cli import create_default_opts
            opts = create_default_opts()
    if opts.editor == '.':
        ans = get_editor_from_env_vars()
    else:
        ans = list(shlex_split(opts.editor))
    ans[0] = os.path.expanduser(ans[0])
    if path_to_edit:
        if line_number:
            eq = os.path.basename(ans[0]).lower()
            if eq in ('code', 'code.exe'):
                path_to_edit += f':{line_number}'
                ans.append('--goto')
            else:
                ans.append(f'+{line_number}')
        ans.append(path_to_edit)
    return ans


def is_path_in_temp_dir(path: str) -> bool:
    if not path:
        return False

    def abspath(x: str | None) -> str:
        if x:
            x = os.path.abspath(os.path.realpath(x))
        return x or ''

    import tempfile
    path = abspath(path)
    candidates = frozenset(map(abspath, ('/tmp', '/dev/shm', os.environ.get('TMPDIR', None), tempfile.gettempdir())))
    for q in candidates:
        if q and path.startswith(q):
            return True
    return False


def is_ok_to_read_image_file(path: str, fd: int) -> bool:
    import stat
    path = os.path.abspath(os.path.realpath(path))
    try:
        path_stat = os.stat(path, follow_symlinks=True)
        fd_stat = os.fstat(fd)
    except OSError:
        return False
    if not os.path.samestat(path_stat, fd_stat):
        return False
    parts = path.split(os.sep)[1:]
    if len(parts) < 1:
        return False
    if parts[0] in ('sys', 'proc', 'dev'):
        if parts[0] == 'dev':
            return len(parts) > 2 and parts[1] == 'shm'
        return False
    return stat.S_ISREG(fd_stat.st_mode)


def resolve_abs_or_config_path(path: str, env: Mapping[str, str] | None = None, conf_dir: str | None = None) -> str:
    path = os.path.expanduser(path)
    path = expandvars(path, env or {})
    if not os.path.isabs(path):
        path = os.path.join(conf_dir or config_dir, path)
    return path


def resolve_custom_file(path: str) -> str:
    opts: Options | None = None
    with suppress(RuntimeError):
        opts = get_options()
    return resolve_abs_or_config_path(path, opts.env if opts else {})


def func_name(f: Any) -> str:
    if hasattr(f, '__name__'):
        return str(f.__name__)
    if hasattr(f, 'func') and hasattr(f.func, '__name__'):
        return str(f.func.__name__)
    return str(f)


def resolved_shell(opts: Options | None = None) -> list[str]:
    q: str = getattr(opts, 'shell', '.')
    if q == '.':
        ans = [shell_path]
    else:
        env = {}
        if opts is not None:
            env['TERM'] = opts.term
        if 'SHELL' not in os.environ:
            env['SHELL'] = shell_path
        if 'HOME' not in os.environ:
            env['HOME'] = os.path.expanduser('~')
        if 'USER' not in os.environ:
            import pwd
            env['USER'] = pwd.getpwuid(os.geteuid()).pw_name
        def expand(x: str) -> str:
            return expandvars(x, env)
        ans = list(map(expand, shlex_split(q)))
    return ans


@run_once
def system_paths_on_macos() -> tuple[str, ...]:
    entries, seen = [], set()

    def add_from_file(x: str) -> None:
        try:
            f = open(x)
        except (FileNotFoundError, PermissionError):
            return
        with f:
            for line in f:
                line = line.strip()
                if line and not line.startswith('#') and line not in seen:
                    if os.path.isdir(line):
                        seen.add(line)
                        entries.append(line)
    try:
        files = os.listdir('/etc/paths.d')
    except (FileNotFoundError, PermissionError):
        files = []
    for name in sorted(files):
        add_from_file(os.path.join('/etc/paths.d', name))
    add_from_file('/etc/paths')
    return tuple(entries)


def which(name: str, only_system: bool = False) -> str | None:
    if os.sep in name:
        return name
    import shutil

    opts: Options | None = None
    with suppress(RuntimeError):
        opts = get_options()

    tried_paths = set()
    paths = []
    append_paths = []
    if opts and opts.exe_search_path:
        for x in opts.exe_search_path:
            x = x.strip()
            if x:
                if x[0] == '-':
                    tried_paths.add(os.path.expanduser(x[1:]))
                elif x[0] == '+':
                    append_paths.append(os.path.expanduser(x[1:]))
                else:
                    paths.append(os.path.expanduser(x))
    ep = os.environ.get('PATH')
    if ep:
        paths.extend(ep.split(os.pathsep))
    paths.append(os.path.expanduser('~/.local/bin'))
    paths.append(os.path.expanduser('~/bin'))
    paths.extend(append_paths)
    ans = shutil.which(name, path=os.pathsep.join(x for x in paths if x not in tried_paths))
    if ans:
        return ans
    # In case PATH is messed up try a default set of paths
    if is_macos:
        system_paths = system_paths_on_macos()
    else:
        system_paths = ('/usr/local/bin', '/opt/bin', '/usr/bin', '/bin', '/usr/sbin', '/sbin')
    tried_paths |= set(paths)
    system_paths = tuple(x for x in system_paths if x not in tried_paths)
    if system_paths:
        ans = shutil.which(name, path=os.pathsep.join(system_paths))
        if ans:
            return ans
        tried_paths |= set(system_paths)
    if only_system or opts is None:
        return None
    shell_env = read_shell_environment(opts)
    for xenv in (shell_env, opts.env):
        q = xenv.get('PATH')
        if q:
            paths = [x for x in xenv['PATH'].split(os.pathsep) if x not in tried_paths]
            ans = shutil.which(name, path=os.pathsep.join(paths))
            if ans:
                return ans
            tried_paths |= set(paths)
    return None


def read_shell_environment(opts: Options | None = None) -> dict[str, str]:
    ans: dict[str, str] | None = getattr(read_shell_environment, 'ans', None)
    if ans is None:
        from .child import openpty
        ans = {}
        setattr(read_shell_environment, 'ans', ans)
        import subprocess
        shell = resolved_shell(opts)
        master, slave = openpty()
        os.set_blocking(master, False)
        if '-l' not in shell and '--login' not in shell:
            shell += ['-l']
        if '-i' not in shell and '--interactive' not in shell:
            shell += ['-i']
        try:
            p = subprocess.Popen(
                shell + ['-c', 'env'], stdout=slave, stdin=slave, stderr=slave, start_new_session=True, close_fds=True,
                preexec_fn=clear_handled_signals)
        except FileNotFoundError:
            log_error('Could not find shell to read environment')
            return ans
        with os.fdopen(master, 'rb') as stdout, os.fdopen(slave, 'wb'):
            raw = b''
            from time import monotonic
            start_time = monotonic()
            while monotonic() - start_time < 1.5:
                try:
                    ret: int | None = p.wait(0.01)
                except subprocess.TimeoutExpired:
                    ret = None
                with suppress(Exception):
                    raw += stdout.read()
                if ret is not None:
                    break
            if cast(Optional[int], p.returncode) is None:
                log_error('Timed out waiting for shell to quit while reading shell environment')
                p.kill()
            elif p.returncode == 0:
                while True:
                    try:
                        x = stdout.read()
                    except Exception:
                        break
                    if not x:
                        break
                    raw += x
                draw = raw.decode('utf-8', 'replace')
                for line in draw.splitlines():
                    k, v = line.partition('=')[::2]
                    if k and v:
                        ans[k] = v
            else:
                log_error('Failed to run shell to read its environment')
    return ans


def parse_uri_list(text: str) -> Generator[str, None, None]:
    ' Get paths from file:// URLs '
    from urllib.parse import unquote, urlparse
    for line in text.splitlines():
        if not line or line.startswith('#'):
            continue
        if not line.startswith('file://'):
            yield line
            continue
        try:
            purl = urlparse(line, allow_fragments=False)
        except Exception:
            yield line
            continue
        if purl.path:
            yield unquote(purl.path)


def edit_config_file() -> None:
    from kitty.config import prepare_config_file_for_editing
    p = prepare_config_file_for_editing()
    editor = get_editor()
    os.execvp(editor[0], editor + [p])


class SSHConnectionData(NamedTuple):
    binary: str
    hostname: str
    port: int | None = None
    identity_file: str = ''
    extra_args: tuple[tuple[str, str], ...] = ()


def get_new_os_window_size(
    metrics: 'OSWindowSize', width: int, height: int, unit: str, incremental: bool = False, has_window_scaling: bool = True
) -> tuple[int, int]:
    if unit == 'cells':
        cw = metrics['cell_width']
        ch = metrics['cell_height']
        width *= cw
        height *= ch
        if has_window_scaling:
            width = round(width / metrics['xscale'])
            height = round(height / metrics['yscale'])
    if incremental:
        w = metrics['width'] + width
        h = metrics['height'] + height
    else:
        w = width or metrics['width']
        h = height or metrics['height']
    return w, h


def get_all_processes() -> Iterable[int]:
    if is_macos:
        from kitty.fast_data_types import get_all_processes as f
        yield from f()
    else:
        for c in os.listdir('/proc'):
            if c.isdigit():
                yield int(c)


def is_kitty_gui_cmdline(*cmd: str) -> bool:
    if not cmd:
        return False
    if os.path.basename(cmd[0]) != 'kitty':
        return False
    if len(cmd) == 1:
        return True
    s = cmd[1][:1]
    if s == '@':
        return False
    if s == '+':
        if cmd[1] == '+':
            return len(cmd) > 2 and cmd[2] == 'open'
        return cmd[1] == '+open'
    return True


def reload_conf_in_all_kitties() -> None:
    import signal

    from kitty.child import cmdline_of_pid

    for pid in get_all_processes():
        try:
            cmd = cmdline_of_pid(pid)
        except Exception:
            continue
        if cmd and is_kitty_gui_cmdline(*cmd):
            os.kill(pid, signal.SIGUSR1)


@run_once
def control_codes_pat() -> 'Pattern[str]':
    return re.compile('[\x00-\x09\x0b-\x1f\x7f-\x9f]')


def sanitize_control_codes(text: str, replace_with: str = '') -> str:
    return control_codes_pat().sub(replace_with, text)


def hold_till_enter() -> None:
    import subprocess

    from .constants import kitten_exe
    subprocess.Popen([kitten_exe(), '__hold_till_enter__']).wait()


def cleanup_ssh_control_masters() -> None:
    import glob
    import subprocess
    try:
        files = frozenset(glob.glob(os.path.join(runtime_dir(), ssh_control_master_template.format(
            kitty_pid=os.getpid(), ssh_placeholder='*'))))
    except OSError:
        return
    workers = tuple(subprocess.Popen([
        'ssh', '-o', f'ControlPath={x}', '-O', 'exit', 'kitty-unused-host-name'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        preexec_fn=clear_handled_signals) for x in files)
    for w in workers:
        w.wait()
    for x in files:
        with suppress(OSError):
            os.remove(x)


def path_from_osc7_url(url: str | bytes) -> str:
    if isinstance(url, bytes):
        url = url.decode('utf-8')
    if url.startswith('kitty-shell-cwd://'):
        return '/' + url.split('/', 3)[-1]
    if url.startswith('file://'):
        from urllib.parse import unquote, urlparse
        return unquote(urlparse(url).path)
    return ''


@run_once
def macos_version() -> tuple[int, ...]:
    # platform.mac_ver does not work thanks to Apple's stupid "hardening", so just use sw_vers
    import subprocess
    try:
        o = subprocess.check_output(['sw_vers', '-productVersion'], stderr=subprocess.STDOUT).decode()
    except Exception:
        return 0, 0, 0
    return tuple(map(int, o.strip().split('.')))


@lru_cache(maxsize=2)
def less_version(less_exe: str = 'less') -> int:
    import subprocess
    o = subprocess.check_output([less_exe, '-V'], stderr=subprocess.STDOUT).decode()
    m = re.match(r'less (\d+)', o)
    if m is None:
        raise ValueError(f'Invalid version string for less: {o}')
    return int(m.group(1))


def is_pid_alive(pid: int) -> bool:
    try:
        os.kill(pid, 0)
    except ProcessLookupError:
        return False
    except Exception:
        pass
    return True


def safer_fork() -> int:
    pid = os.fork()
    if pid:
        # master
        import ssl
        ssl.RAND_add(os.urandom(32), 0.0)
    else:
        # child
        import atexit
        atexit._clear()
    return pid


def docs_url(which: str = '', local_docs_root: str | None = '') -> str:
    from urllib.parse import quote

    from .conf.types import resolve_ref
    from .constants import local_docs, website_url
    if local_docs_root is None:
        ld = ''
    else:
        ld = local_docs_root or local_docs()
    base, frag = which.partition('#')[::2]
    base = base.strip('/')
    if frag.startswith('ref='):
        ref = frag[4:]
        which = resolve_ref(ref, lambda x: x)
        if which.startswith('https://') or which.startswith('http://'):
            return which
        base, frag = which.partition('#')[::2]
        base = base.strip('/')
    if ld:
        base = base or 'index'
        url = f'file://{ld}/' + quote(base) + '.html'
    else:
        url = website_url(base)
    if frag:
        url += '#' + frag
    return url


def sanitize_for_bracketed_paste(text: bytes) -> bytes:
    pat = re.compile(b'(?:(?:\033\\\x5b)|(?:\x9b))201~')
    while True:
        new_text = pat.sub(b'', text)
        if new_text == text:
            break
        text = new_text
    return text


@lru_cache(maxsize=64)
def sanitize_url_for_display_to_user(url: str) -> str:
    from urllib.parse import unquote, urlparse, urlunparse
    try:
        purl = urlparse(url)
        if purl.netloc:
            purl = purl._replace(netloc=purl.netloc.encode('idna').decode('ascii'))
        if purl.path:
            purl = purl._replace(path=unquote(purl.path))
        url = urlunparse(purl)
    except Exception as e:
        log_error(e)
        url = 'Unparseable URL: ' + url
    return url


def extract_all_from_tarfile_safely(tf: 'tarfile.TarFile', dest: str) -> None:
    # Ensure that all extracted items are within dest

    def is_within_directory(directory: str, target: str) -> bool:
        abs_directory = os.path.abspath(directory)
        abs_target = os.path.abspath(target)
        prefix = os.path.commonprefix((abs_directory, abs_target))
        return prefix == abs_directory

    def safe_extract(tar: 'tarfile.TarFile', path: str = ".", numeric_owner: bool = False) -> None:
        for member in tar.getmembers():
            member_path = os.path.join(path, member.name)
            if not is_within_directory(path, member_path):
                raise ValueError(f'Attempted path traversal in tar file: {member.name}')
        tar.extractall(path, tar.getmembers(), numeric_owner=numeric_owner)

    safe_extract(tf, dest)


def is_png(path: str) -> bool:
    if path:
        with suppress(Exception), open(path, 'rb') as f:
            header = f.read(8)
            return header.startswith(b'\211PNG\r\n\032\n')
    return False


def cmdline_for_hold(cmd: Sequence[str] = (), opts: Optional['Options'] = None) -> list[str]:
    if opts is None:
        with suppress(RuntimeError):
            opts = get_options()
    if opts is None:
        from .options.types import defaults
        opts = defaults
    ksi = ' '.join(opts.shell_integration)
    import shlex
    shell = shlex.join(resolved_shell(opts))
    return [kitten_exe(), 'run-shell', f'--shell={shell}', f'--shell-integration={ksi}', '--env=KITTY_HOLD=1'] + list(cmd)


def safe_mtime(path: str) -> float | None:
    with suppress(OSError):
        return os.path.getmtime(path)
    return None


@run_once
def get_custom_window_icon() -> tuple[float, str] | tuple[None, None]:
    filenames = ['kitty.app.png']
    if is_macos:
        # On macOS, prefer icns to png.
        filenames.insert(0, 'kitty.app.icns')
    for name in filenames:
        custom_icon_path = os.path.join(config_dir, name)
        custom_icon_mtime = safe_mtime(custom_icon_path)
        if custom_icon_mtime is not None:
            return custom_icon_mtime, custom_icon_path
    return None, None


def key_val_matcher(items: Iterable[tuple[str, str]], key_pat: 're.Pattern[str]', val_pat: Optional['re.Pattern[str]']) -> bool:
    for key, val in items:
        if key_pat.search(key) is not None and (
                val_pat is None or val_pat.search(val) is not None):
            return True
    return False


def shlex_split(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[str]:
    yield from Shlex(text, allow_ansi_quoted_strings)


def shlex_split_with_positions(text: str, allow_ansi_quoted_strings: bool = False) -> Iterator[tuple[int, str]]:
    s = Shlex(text, allow_ansi_quoted_strings)
    while (q := s.next_word())[0] > -1:
        yield q


def timed_debug_print(*a: Any, sep: str = ' ', end: str = '\n') -> None:
    _timed_debug_print(sep.join(map(str, a)) + end)


def lock_file(f: BinaryIO) -> None:
    if not f.writable():
        raise ValueError('Cannot lock files not opened in writable mode')
    fcntl.lockf(f, fcntl.LOCK_EX)


def unlock_file(f: BinaryIO) -> None:
    if not f.writable():
        raise ValueError('Cannot unlock files not opened in writable mode')
    fcntl.lockf(f, fcntl.LOCK_UN)
